/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.api.service.impl;

import com.chinamobile.cmss.lakehouse.api.dto.JobBean;
import com.chinamobile.cmss.lakehouse.api.scheduler.job.FlinkXSubmitJob;
import com.chinamobile.cmss.lakehouse.api.service.JobService;
import com.chinamobile.cmss.lakehouse.api.service.LinkService;
import com.chinamobile.cmss.lakehouse.api.service.LinkServiceSelector;
import com.chinamobile.cmss.lakehouse.api.service.UserService;
import com.chinamobile.cmss.lakehouse.common.Constants;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.ContentConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.DataTransferConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.FieldConf;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.JobConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.ReaderConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.SettingConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.SpeedConfig;
import com.chinamobile.cmss.lakehouse.common.dto.flinkx.WriterConfig;
import com.chinamobile.cmss.lakehouse.common.enums.JobRunningStatusEnum;
import com.chinamobile.cmss.lakehouse.common.enums.JobTypeEnum;
import com.chinamobile.cmss.lakehouse.common.enums.ScheduleModelEnum;
import com.chinamobile.cmss.lakehouse.common.enums.ScheduleStatusEnum;
import com.chinamobile.cmss.lakehouse.common.enums.Status;
import com.chinamobile.cmss.lakehouse.common.utils.BeanUtils;
import com.chinamobile.cmss.lakehouse.common.utils.ParameterUtils;
import com.chinamobile.cmss.lakehouse.common.utils.Result;
import com.chinamobile.cmss.lakehouse.dao.DataSourceDao;
import com.chinamobile.cmss.lakehouse.dao.JobDao;
import com.chinamobile.cmss.lakehouse.dao.JobHistoryDao;
import com.chinamobile.cmss.lakehouse.dao.JobTaskHistoryDao;
import com.chinamobile.cmss.lakehouse.dao.ScheduleInfoDao;
import com.chinamobile.cmss.lakehouse.dao.entity.DataSourceEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.JobEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.JobHistoryEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.JobTaskHistoryEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.ScheduleInfoEntity;
import com.chinamobile.cmss.lakehouse.service.quartz.QuartzExecutor;

import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;

import javax.annotation.Resource;
import javax.persistence.criteria.Predicate;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobDataMap;
import org.quartz.ScheduleBuilder;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
public class JobServiceImpl extends BaseServiceImpl implements JobService {
    public static final String JOB_NAME_PREFIX = "flinkx_";
    public static final String JOB_GROUP = "flinkx";

    @Resource
    private JobDao jobDao;
    @Resource
    private ScheduleInfoDao scheduleInfoDao;
    @Resource
    private JobHistoryDao jobHistoryDao;
    @Resource
    private JobTaskHistoryDao jobTaskHistoryDao;
    @Resource
    private DataSourceDao dataSourceDao;
    @Resource
    private LinkServiceSelector linkServiceSelector;

    @Autowired
    private QuartzExecutor executor;
    @Autowired
    private UserService userService;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Map<String, Object> create(String userId, JobBean jobBean) {
        Map<String, Object> result = new HashMap<>();

        Optional<JobEntity> sameNameEntity = jobDao.findByNameAndCreateUserId(jobBean.getName(), userId);
        if (sameNameEntity.isPresent()) {
            putMessage(result, Status.JOB_NAME_EXIST);
            return result;
        }

        ScheduleInfoEntity scheduleInfoEntity = buildScheduleInfoEntity(jobBean);
        scheduleInfoDao.save(scheduleInfoEntity);
        JobEntity jobEntity = buildJobEntity(userId, jobBean);
        jobEntity.setScheduleInfo(scheduleInfoEntity);
        jobEntity = jobDao.save(jobEntity);

        result.put(Constants.DATA_LIST, jobEntity);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> get(String userId, Long id) {
        Map<String, Object> result = new HashMap<>();

        Optional<JobEntity> jobEntity = jobDao.findByIdAndCreateUserId(id, userId);
        if (!jobEntity.isPresent()) {
            putMessage(result, Status.JOB_NOT_EXIST);
            return result;
        }

        result.put(Constants.DATA_LIST, jobEntity);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Map<String, Object> update(String userId, Long id, JobBean jobBean) {
        Map<String, Object> result = new HashMap<>();

        JobEntity existEntity = jobDao.getById(id);
        if (existEntity == null) {
            putMessage(result, Status.JOB_NOT_EXIST);
            return result;
        }

        if (!StringUtils.equals(jobBean.getName(), existEntity.getName())) {
            Optional<JobEntity> sameNameEntity = jobDao.findByNameAndCreateUserId(jobBean.getName(), userId);
            if (sameNameEntity.isPresent() && (!sameNameEntity.get().getId().equals(existEntity.getId()))) {
                putMessage(result, Status.JOB_NAME_EXIST);
                return result;
            }
        }

        ScheduleInfoEntity scheduleInfoEntity = scheduleInfoDao.getById(existEntity.getScheduleInfo().getId());
        jobBean.getScheduleInfo().setId(scheduleInfoEntity.getId());
        BeanUtils.copyProperties(jobBean.getScheduleInfo(), scheduleInfoEntity);
        scheduleInfoDao.save(scheduleInfoEntity);

        Date now = new Date();
        jobBean.setId(existEntity.getId());
        jobBean.setScheduleInfo(null);
        BeanUtils.copyProperties(jobBean, existEntity);
        existEntity.setUpdateUserId(userId);
        existEntity.setUpdateTime(now);
        existEntity = jobDao.save(existEntity);

        result.put(Constants.DATA_LIST, existEntity);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Map<String, Object> delete(String userId, Long id) {
        Map<String, Object> result = new HashMap<>();

        JobEntity existEntity = jobDao.getById(id);
        if (existEntity == null) {
            putMessage(result, Status.JOB_NOT_EXIST);
            return result;
        }

        scheduleInfoDao.deleteById(existEntity.getScheduleInfo().getId());
        jobDao.delete(existEntity);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Result list(String userId, String searchVal, Integer pageNo, Integer pageSize) {
        Result<Object> result = new Result<>();

        PageRequest pageRequest = PageRequest.of(pageNo - 1, pageSize, Sort.Direction.DESC, "id");
        Specification<JobEntity> specification = buildSpecification(userId, searchVal);
        Page<JobEntity> pageInfo = jobDao.findAll(specification, pageRequest);
        result.setData(pageInfo);
        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> scheduleOn(String userId, Long id) {
        Map<String, Object> result = new HashMap<>();

        Optional<JobEntity> existEntity = jobDao.findById(id);
        if (!existEntity.isPresent()) {
            putMessage(result, Status.JOB_NOT_EXIST);
            return result;
        }

        if (existEntity.get().getScheduleInfo().getScheduleStatus() == ScheduleStatusEnum.ON) {
            putMessage(result, Status.JOB_SCHEDULE_STATUS, ScheduleStatusEnum.ON);
            return result;
        }

        if (existEntity.get().getScheduleInfo().getScheduleStartTime().before(new Date())) {
            putMessage(result, Status.JOB_SCHEDULE_TIME_ERR);
            return result;
        }

        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("id", existEntity.get().getId());
        String jobName = JOB_NAME_PREFIX + existEntity.get().getId();
        Trigger trigger = existEntity.get().getScheduleInfo().getScheduleModel() == ScheduleModelEnum.SCHEDULE_ONCE
                ? buildOnceTrigger(existEntity.get().getScheduleInfo(), jobName, JOB_GROUP)
                : buildCycleTrigger(existEntity.get().getScheduleInfo(), jobName, JOB_GROUP);
        executor.addJob(FlinkXSubmitJob.class, jobName, JOB_GROUP, trigger, jobDataMap);

        putMessage(result, Status.SUCCESS);
        return result;
    }

    @Override
    public Map<String, Object> scheduleOff(String userId, Long id) {
        Map<String, Object> result = new HashMap<>();

        Optional<JobEntity> existEntity = jobDao.findById(id);
        if (!existEntity.isPresent()) {
            putMessage(result, Status.JOB_NOT_EXIST);
            return result;
        }

        if (existEntity.get().getScheduleInfo().getScheduleStatus() == ScheduleStatusEnum.OFF) {
            putMessage(result, Status.JOB_SCHEDULE_STATUS, ScheduleStatusEnum.OFF);
            return result;
        }

        String jobName = JOB_NAME_PREFIX + existEntity.get().getId();
        if (executor.deleteJob(jobName, JOB_GROUP)) {
            putMessage(result, Status.SUCCESS);
            return result;
        }

        putMessage(result, Status.JOB_SCHEDULE_OFF_ERR);
        return result;
    }

    @Override
    public void submitFlinkXJob(Long id) {
        JobEntity jobEntity = jobDao.getById(id);
        JobHistoryEntity jobHistory = saveJobHistory(jobEntity);

        int runningSucceedTasks = 0;
        int planTasks = 0;
        // Init job config.
        try {
            DataSourceEntity sourceEntity = dataSourceDao.getById(jobEntity.getSourceId());
            DataSourceEntity targetEntity = dataSourceDao.getById(jobEntity.getTargetId());
            LinkService sourceLinkService = linkServiceSelector.getLinkService(sourceEntity.getDataSourceType());
            LinkService targetLinkService = linkServiceSelector.getLinkService(targetEntity.getDataSourceType());
            String userName = userService.findByUserId(jobEntity.getCreateUserId()).getUserName();
            if (jobEntity.getJobType() == JobTypeEnum.OFFLINE) {
                List<String> syncTables = sourceLinkService.queryTablePath(sourceEntity, jobEntity.getSourceParam());
                planTasks = syncTables.size();
                jobHistory.setSubTaskNum(planTasks);
                runningSucceedTasks = buildOfflineJob(jobEntity, sourceEntity, targetEntity, sourceLinkService,
                        targetLinkService, syncTables, jobHistory, userName);
            }
        } catch (Exception e) {
            log.warn("submit flinkx job[{}] err.", jobEntity.getId(), e);
        } finally {
            //TODO finally
            log.info("submit flinkx job[{}] end.", jobEntity.getId());
        }
    }

    private JobHistoryEntity saveJobHistory(JobEntity jobEntity) {
        JobHistoryEntity jobHistory = new JobHistoryEntity();
        jobHistory.setJobId(jobEntity.getId());
        jobHistory.setScheduleModel(jobEntity.getScheduleInfo().getScheduleModel());
        jobHistory.setJobRunningStatus(JobRunningStatusEnum.QUEUING);
        jobHistory.setStartTime(new Date());
        jobHistoryDao.save(jobHistory);

        return jobHistoryDao.save(jobHistory);
    }

    private Trigger buildOnceTrigger(ScheduleInfoEntity scheduleInfo, String jobName, String jobGroup) {
        TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger()
                .withIdentity(jobName, jobGroup);
        if (null != scheduleInfo.getScheduleStartTime()) {
            triggerBuilder.startAt(scheduleInfo.getScheduleStartTime());
        } else {
            triggerBuilder.startAt(new Date());
        }

        return triggerBuilder.build();
    }

    private Trigger buildCycleTrigger(ScheduleInfoEntity scheduleInfo, String jobName, String jobGroup) {
        DateTime startDateTime = new DateTime(scheduleInfo.getScheduleEndTime());
        TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();

        ScheduleBuilder scheduleBuilder;
        if (ChronoUnit.MONTHS == scheduleInfo.getDurationUnit()) {
            scheduleBuilder = CronScheduleBuilder
                    .monthlyOnDayAndHourAndMinute(startDateTime.getDayOfMonth(), startDateTime.getHourOfDay(), startDateTime.getMinuteOfHour())
                    .withMisfireHandlingInstructionFireAndProceed();
        } else {

            scheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
                    .withIntervalInMinutes(Ints.checkedCast(scheduleInfo.getDurationUnit().getDuration().getSeconds() / 60))
                    .repeatForever()
                    .withMisfireHandlingInstructionNextWithRemainingCount();
        }

        triggerBuilder.withIdentity(jobName, jobGroup)
                .startAt(scheduleInfo.getScheduleEndTime());
        if (null != scheduleInfo.getScheduleEndTime()) {
            triggerBuilder.endAt(scheduleInfo.getScheduleEndTime());
        }
        triggerBuilder.withSchedule(scheduleBuilder);

        return triggerBuilder.build();
    }

    private JobEntity buildJobEntity(String userId, JobBean jobBean) {
        Date now = new Date();
        jobBean.setCreateUserId(userId);
        jobBean.setUpdateUserId(userId);
        jobBean.setCreateTime(now);
        jobBean.setUpdateTime(now);
        JobEntity jobEntity = BeanUtils.copyFrom(jobBean, JobEntity.class);
        return jobEntity;
    }

    private ScheduleInfoEntity buildScheduleInfoEntity(JobBean jobBean) {
        ScheduleInfoEntity scheduleInfoEntity = BeanUtils.copyFrom(jobBean.getScheduleInfo(), ScheduleInfoEntity.class);
        return scheduleInfoEntity;
    }

    private Specification<JobEntity> buildSpecification(String userId, String searchVal) {
        return (root, criteriaQuery, cBuilder) -> {
            //define a predicate
            Predicate p = cBuilder.and(cBuilder.equal(root.get("createUserId"), userId));
            if (StringUtils.isNotBlank(searchVal)) {
                p = cBuilder.and(p, cBuilder.like(root.get("name"), "%" + ParameterUtils.replaceSpecialChars(searchVal) + "%"));
            }
            return p;
        };
    }

    private int buildOfflineJob(JobEntity jobEntity, DataSourceEntity source, DataSourceEntity target, LinkService sourceLinkService,
                                LinkService targetLinkService, List<String> syncTables, JobHistoryEntity jobHistory, String userName) {
        int runningSucceedTasks = 0;
        ReaderConfig readerConfig = sourceLinkService.getReaderConfig(source);
        WriterConfig writerConfig = targetLinkService.getWriterConfig(target);
        final CountDownLatch latch = new CountDownLatch(syncTables.size());

        String sourceSchema = sourceLinkService.getTablePathSchema(source.getDataSourceType(), jobEntity.getSourceParam());
        String targetSchema = targetLinkService.getTablePathSchema(target.getDataSourceType(), jobEntity.getTargetParam());

        for (String table : syncTables) {
            //---------------------STEP 1。 Init config and save taskHistory---------------------
            JobTaskHistoryEntity taskHistory = saveTaskHistory(jobEntity, jobHistory, "");
            try {
                //---------------------STEP 2. Reader config---------------------
                String sourceTablePath = sourceSchema + table;
                // Update reader
                List<FieldConf> columns = sourceLinkService.getMetadata(source, sourceTablePath, null);
                sourceLinkService.updateReaderConfig(readerConfig, table, columns, sourceSchema);

                //---------------------STEP 3. Writer config---------------------
                if (targetLinkService.checkTablePathExist(target, targetSchema, table, userName)) {
                    targetLinkService.createNewTablePath(target, targetSchema, table, userName, jobEntity.getTargetParam(), columns);
                }
                targetLinkService.updateWriteConfig(writerConfig, table, columns, targetSchema);

                //---------------------STEP 4. Init flinkx job config---------------------
                DataTransferConfig dataTransferConfig = buildDataTransferConfig(readerConfig, writerConfig);
                String dataTransferConfStr = JSONObject.toJSONString(dataTransferConfig,
                        SerializerFeature.DisableCircularReferenceDetect);
                taskHistory.setFlinkxConfig(dataTransferConfStr);
                // 提交任务, 并返回任务的提交地址
                taskHistory.setFlinkWebUrl(submitFlinkxTask(taskHistory));
                // STEP 5. Submit flinkx job to k8s.
                log.info(">>>Submit job[{}] subTaskHistory[{}]", jobEntity.getId(), taskHistory.getId());
                // TODO Add watch
                runningSucceedTasks++;
            } catch (Exception e) {
                taskHistory.setEndTime(new Date());
                taskHistory.setJobRunningStatus(JobRunningStatusEnum.FAILURE);
                latch.countDown();
                log.error(">>>Submit job[{}] subTaskHistory[{}] err: {}", jobEntity.getId(), taskHistory.getId(), e);
            } finally {
                jobTaskHistoryDao.save(taskHistory);
            }
        }

        // TODO Wait and watch
        return runningSucceedTasks;
    }

    private JobTaskHistoryEntity saveTaskHistory(JobEntity job, JobHistoryEntity jobHistory, String flinkxConfig) {
        JobTaskHistoryEntity taskHistory = new JobTaskHistoryEntity();
        taskHistory.setJobHistoryId(jobHistory.getId());
        taskHistory.setStartTime(new Date());
        taskHistory.setFlinkxConfig(flinkxConfig);
        taskHistory.setJobRunningStatus(JobRunningStatusEnum.RUNNING);
        jobTaskHistoryDao.save(taskHistory);

        // k8s ClusterID
        taskHistory.setClusterId(appendFlinkClusterId(job.getId(), jobHistory.getId(), taskHistory.getId()));
        jobTaskHistoryDao.save(taskHistory);

        return taskHistory;
    }

    /**
     * Underline "_" isn't supportted by k8s.
     * @param jobId
     * @param jobHistoryId
     * @param taskHistoryId
     * @return
     */
    private String appendFlinkClusterId(Long jobId, Long jobHistoryId, Long taskHistoryId) {
        return Constants.SYNC_JOB_PREFIX + "-" + jobId + "-" + jobHistoryId + "-" + taskHistoryId;
    }

    private DataTransferConfig buildDataTransferConfig(ReaderConfig readerConfig, WriterConfig writerConfig) {
        ContentConfig contentConfig = new ContentConfig(readerConfig, writerConfig);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setContent(Lists.newArrayList(contentConfig));

        SettingConfig defaultSettingConfig = new SettingConfig();
        defaultSettingConfig.setSpeed(new SpeedConfig());
        jobConfig.setSetting(defaultSettingConfig);

        DataTransferConfig dataTransferConfig = new DataTransferConfig();
        dataTransferConfig.setJob(jobConfig);

        return dataTransferConfig;
    }

    private String submitFlinkxTask(JobTaskHistoryEntity taskHistory) {
        // TODO Submit Flinkx k8s task.
        return "";
    }
}
